package com.xl.competition.module_c.old.task

import com.alibaba.fastjson.{JSON, JSONObject}
import com.xl.competition.module_c.bean.{OrderDetail, OrderInfo}
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerConfig

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.time.Duration
import java.util.Properties

/**
 * @author: xl
 * @createTime: 2023/11/20 20:43:08
 * @program: com.xl.competition
 * @description:
 */
object Task1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092")
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-3145")
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    //如果没有记录偏移量，第一次从最开始消费:earliest 从最新的位置开始消费:latest
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    //[表情]fka的消费者，不自动提交偏移量
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

    val redisConfig: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
      .setHost("node1")
      .setPort(6379)
      .setPassword("Abc123..")
      .build()

    val kafkaDS: DataStream[String] = env
      // 获取kafka中指定主题的数据
      .addSource(new FlinkKafkaConsumer[String]("order", new SimpleStringSchema(), properties))
      //注册时间水位线指定允许延迟5秒钟
      .process((value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]) => {
        try {
          JSON.parseObject(value)
          out.collect(value)
        } catch {
          case _: Exception =>
        }
      })
      .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5L))
        //指定以 createTime 或者 operateTime 为事件时间
        .withTimestampAssigner(new SerializableTimestampAssigner[String] {
          override def extractTimestamp(t: String, l: Long): Long = {
            val jSONObject: JSONObject = JSON.parseObject(t)
            val createTime: Long = jSONObject.getLongValue("createTime")
            val operateTime: Long = jSONObject.getLongValue("operateTime")
            if (createTime > operateTime) createTime else operateTime
          }
        })
      )

    val orderDetailFlag: OutputTag[OrderDetail] = new OutputTag[OrderDetail]("orderDetail")
    val refundOrder: OutputTag[OrderInfo] = new OutputTag[OrderInfo]("refundOrder")
    val cancelOrder: OutputTag[OrderInfo] = new OutputTag[OrderInfo]("cancelOrder")

    val orderInfoDS: DataStream[OrderInfo] = kafkaDS.process((item: String, context: ProcessFunction[String, OrderInfo]#Context, collector: Collector[OrderInfo]) => {
      try {
        if (!item.contains("orderId")) {
          val orderInfo: OrderInfo = JSON.parseObject(item, classOf[OrderInfo])
          collector.collect(orderInfo)
          if ("refund".equals(orderInfo.orderStatus)) {
            context.output(refundOrder, orderInfo)
          }
          if ("cancel".equals(orderInfo.orderStatus)) {
            context.output(cancelOrder, orderInfo)
          }
        } else {
          context.output(orderDetailFlag, JSON.parseObject(item, classOf[OrderDetail]))
        }
      } catch {
        case _: Exception =>
      }
    })

    orderInfoDS.getSideOutput(refundOrder)
      .map[(String, BigDecimal)](new MapFunction[OrderInfo, (String, BigDecimal)] {
        override def map(value: OrderInfo): (String, BigDecimal) = {
          ("totalrefundordercount", value.totalAmount)
        }
      })
      .keyBy(new KeySelector[(String, BigDecimal), String]() {
        override def getKey(value: (String, BigDecimal)): String = value._1
      })
      .reduce(new ReduceFunction[(String, BigDecimal)] {
        override def reduce(value1: (String, BigDecimal), value2: (String, BigDecimal)): (String, BigDecimal) = {
          (value1._1, value1._2.+(value2._2))
        }
      })
      .addSink(new RedisSink[(String, BigDecimal)](redisConfig, new MyRedisMapper("totalrefundordercount")))

    orderInfoDS.getSideOutput(cancelOrder)
      .addSink(new MySink())

    orderInfoDS
      .map[(String, BigDecimal)](new MapFunction[OrderInfo, (String, BigDecimal)] {
        override def map(value: OrderInfo): (String, BigDecimal) = {
          ("totalprice", value.totalAmount)
        }
      })
      .keyBy(new KeySelector[(String, BigDecimal), String]() {
        override def getKey(value: (String, BigDecimal)): String = value._1
      })
      .reduce(new ReduceFunction[(String, BigDecimal)] {
        override def reduce(value1: (String, BigDecimal), value2: (String, BigDecimal)): (String, BigDecimal) = {
          (value1._1, value1._2.+(value2._2))
        }
      })
      .addSink(new RedisSink[(String, BigDecimal)](redisConfig, new MyRedisMapper("totalprice")))


    env.execute()
  }


  class MyRedisMapper(key: String) extends RedisMapper[(String, BigDecimal)] {
    override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.SET, key)

    override def getKeyFromData(t: (String, BigDecimal)): String = key

    override def getValueFromData(t: (String, BigDecimal)): String = t._2.toString()
  }


  class MySink extends RichSinkFunction[OrderInfo] {
    var connection: Connection = _
    var statement: PreparedStatement = _

    override def open(parameters: Configuration): Unit = {
      val url: String = "jdbc:mysql://node3:3306/shtd_result?characterEncoding=utf8"
      connection = DriverManager.getConnection(url, "root", "Abc123..")
      statement = connection.prepareStatement("insert into order_info (id, consignee, consignee_tel, final_total_amount, feight_fee) values (?,?,?,?,?)")
    }

    override def invoke(value: OrderInfo, context: SinkFunction.Context): Unit = {
      statement.setLong(1, value.id);
      statement.setString(2, value.consignee)
      statement.setString(3, value.consigneeTel)
      statement.setBigDecimal(4, value.totalAmount)
      statement.setBigDecimal(5, value.feightFee)
      statement.execute()
    }

    override def close(): Unit = {
      statement.close()
      connection.close()
    }
  }
}